在微服务大行其道的今天,Spring Cloud Alibaba作为优秀的微服务实现,却不能很容易的集成ElasticAPM。本文就将解决的思路和实现,呈现给大家,希望能帮助大家。
前言 继上一篇ElasticAPM初体验 我们知道了什么是可观察性 ,并领略了ElasticAPM 的强大功能,但是仅仅是上篇文章中单机模式的使用时远远不够的。还记得上一篇最后提出的两个问题:
1、本文在单机版的环境中,测试通过,但是在分布式环境中,请求会串联起很多应用,那服务跟踪能否实现?实现的原理是什么?
2、Elastic APM可以自动采集http请求,在PRC分布式环境中,Elastic APM能否正常工作?是否必须采用 public API
来实现?
重点是分布式 和RPC ,即在分布式情况下,ElasticAPM能否良好工作?在RPC环境下,ElasticAPM是不是也能正常工作呢?
先说答案:在默认情况下,ElasticAPM能够支持分布式的Http方式调用,但是不支持RPC协议 。但是很多公司都采用RPC协议作为其内部系统的通信协议,比如我司就采用Spring Cloud Alibaba作为搜索服务的框架,框架内应用的通信是借助RPC框架Dubbo来实现的。所以问题就变成了如何把ElasticAPM 集成进Spring Cloud Alibaba中。
架构讲解&问题分析 首先,我先大概图示下Spring Cloud Alibaba和ElasticAPM的架构和工作流程。
如架构图所示,搜索系统分为了网关应用(Gateway) ,US应用 ,AS应用 ,BS应用 ,用户的请求会先到达网关,网关会把请求,以Http协议转发给US应用,US应用会采用Dubbo协议调用AS应用,AS应用采用Dubbo协议调用BS应用。
Request —http —>US —RPC —->AS —–RPC ——>BS
每一个应用启动的时候都已经集成了Apm-agent(如果不知道怎么集成请参考ElasticAPM初体验 ),如果APM-agent默认支持Dubbo 就完美了(但是并没有)。所以整个链路追踪,到了US之后,就没有上报之后应用的锚点数据。在查看ElasticAPM官方文档的时候,我注意到了Public API ,文档中交代了这样一件事情:
The public API of the Elastic APM Java agent lets you customize and manually create spans and transactions, as well as track errors.
没错,你可以自定义Span
和Transaction
,如果不懂什么是Span
和Transaction
请参考ElasticAPM初体验 或直接读一遍官方文档 。既然Agent默认不支持Dubbo,那么我们使用Public API来实现功能。
设计思想 基于Spring Cloud Alibaba的架构,我们可以如下图方式实现。
首先用户的请求一定要经过微服务网关,在网关的过滤器中,首先埋入父级Transaction
。
请求经过网关,会被网关转发到第一层应用中,注意这次转发是http
请求,如果是用SpringMVC实现的话,需要在Controller
处,上报子Transaction
。
请求被第一层应用处理之后,下层的应用全部是Dubbo
协议的。这时可以采用Dubbo的过滤器机制,对Concumer
和Provider
都进行拦截,通过这种方式做到不侵入业务代码。
最终,请求返回到微服务网关,调用transaction.end()
上报根Transaction
。
所有流程完毕。
核心实现讲解 微服务网关 微服务网关需要做这样几件事情:
开启根Transaction
POST
请求body中增加追踪ID,GET
请求Parameter
中增加追踪ID
在请求返回之后调用transaction.end()
完成上报
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 public Mono<Void> filter (ServerWebExchange exchange, GatewayFilterChain chain) { HttpMethod httpMethod = exchange.getRequest().getMethod(); Transaction transaction = ElasticApm.startTransaction(); transaction.setName("mainSearch" ); transaction.setType(Transaction.TYPE_REQUEST); Span span = transaction.startSpan("gateway" , "filter" , "gateway action" ); span.setName("com.mfw.search.gateway.filter.PostBodyCacheFilter#filter" ); LOGGER.info("APM埋点成功transactionId:{}" , transaction.getId()); if (HttpMethod.POST.equals(httpMethod)) { ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders); MediaType mediaType = exchange.getRequest().getHeaders().getContentType(); Mono<String> modifiedBody = serverRequest.bodyToMono(String.class).flatMap(body -> { if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) { Map<String, String> bodyMap = decodeBody(body); exchange.getAttributes().put(GatewayConstant.CACHE_POST_BODY, bodyMap); span.injectTraceHeaders((name, value) -> { bodyMap.put(name, value); LOGGER.info("APM埋点 key:{}, transactionId:{}" , name, value); }); span.end(); return Mono.just(encodeBody(bodyMap)); } else if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { Map<String, String> bodyMap = decodeJsonBody(body); exchange.getAttributes().put(GatewayConstant.CACHE_POST_BODY, bodyMap); span.injectTraceHeaders((name, value) -> { bodyMap.put(name, value); LOGGER.info("APM埋点 key:{}, transactionId:{}" , name, value); }); span.end(); return Mono.just(encodeJsonBody(bodyMap)); } return Mono.empty(); }); BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class); HttpHeaders headers = new HttpHeaders(); headers.putAll(exchange.getRequest().getHeaders()); headers.remove(HttpHeaders.CONTENT_LENGTH); CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers); return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> { ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(exchange.getRequest()) { public HttpHeaders getHeaders () { long contentLength = headers.getContentLength(); HttpHeaders httpHeaders = new HttpHeaders(); httpHeaders.putAll(super .getHeaders()); if (contentLength > 0 ) { httpHeaders.setContentLength(contentLength); } else { httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked" ); } return httpHeaders; } public Flux<DataBuffer> getBody () { return outputMessage.getBody(); } }; return chain.filter(exchange.mutate().request(decorator).build()).then(Mono.fromRunnable(() -> transaction.end())); })); } else if (HttpMethod.GET.equals(httpMethod)) { span.injectTraceHeaders((name, value) -> { exchange.getRequest().getQueryParams().set(name, transaction.getId()); LOGGER.info("APM埋点 key:{}, transactionId:{}" , name, value); }); return chain.filter(exchange).then(Mono.fromRunnable(() -> { span.end(); transaction.end(); LOGGER.info("APM买点完成,transactionId:{}" , transaction.getId()); })); } else { exchange.getResponse().setStatusCode(HttpStatus.UNSUPPORTED_MEDIA_TYPE); return exchange.getResponse().setComplete(); } }
Controller Controller层的实现采用了SpringAOP方式实现,这样的好处是对业务代码不侵入,可扩展性高,对想要监控的方法直接配置上@TransactionWithRemoteParent()
即可。
如下代码是通过@TransactionWithRemoteParent()
实现对Controller方法的上报。
1 2 3 4 5 6 @PostMapping (value = "/search" , consumes = "application/json" , produces = "application/json" )@TransactionWithRemoteParent ()public String searchForm (@RequestBody String req) { String result = asService.helloAs(req); return result; }
AOP实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 @Aspect public class ApmAspect { private static final Logger LOGGER = LoggerFactory.getLogger(ApmAspect.class); @PostConstruct private void init () { LOGGER.info("ApmAspect加载完毕" ); } @Pointcut (value = "@annotation(transactionWithRemoteParent)" , argNames = "transactionWithRemoteParent" ) public void pointcut (TransactionWithRemoteParent transactionWithRemoteParent) { } @Around (value = "pointcut(transactionWithRemoteParent)" , argNames = "joinPoint,transactionWithRemoteParent" ) public Object around (ProceedingJoinPoint joinPoint, TransactionWithRemoteParent transactionWithRemoteParent) throws Throwable { Transaction transaction = null ; try { MethodSignature signature = (MethodSignature) joinPoint.getSignature(); transaction = ElasticApm.startTransactionWithRemoteParent(key -> { String httpRequest = (String) joinPoint.getArgs()[0 ]; JSONObject json = JSON.parseObject(httpRequest); String traceId = json.getString(key); LOGGER.info("切面添加了子Transaction,key={},value={}" , key, traceId); RpcContext.getContext().setAttachment(key, traceId); return traceId; }); transaction.setName(StringUtils.isNotBlank(transactionWithRemoteParent.name()) ? transactionWithRemoteParent.name() : signature.getName()); transaction.setType(Transaction.TYPE_REQUEST); return joinPoint.proceed(); } catch (Throwable throwable) { if (transaction != null ) { transaction.captureException(throwable); } throw throwable; } finally { if (transaction != null ) { LOGGER.info("切面执行完毕,上报Transaction:{}" , transaction.getId()); transaction.end(); } } } }
Dubbo过滤器 如下代码是DubboConsumer过滤器,专门用于处理APM。DubboProvider的实现类似。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Activate (group = "consumer" )public class DubboConsumerApmFilter implements Filter { private static final Logger LOGGER = LoggerFactory.getLogger(DubboConsumerApmFilter.class); @Override public Result invoke (Invoker<?> invoker, Invocation invocation) throws RpcException { Transaction transaction = ElasticApm.startTransactionWithRemoteParent(key -> { String traceId = invocation.getAttachment(key); LOGGER.info("key={},value={}" , key, traceId); return traceId; }); try (final Scope scope = transaction.activate()) { String name = "consumer:" + invocation.getInvoker().getInterface().getName() + "#" + invocation.getMethodName(); transaction.setName(name); transaction.setType(Transaction.TYPE_REQUEST); Result result = invoker.invoke(invocation); return result; } catch (Exception e) { transaction.captureException(e); throw e; } finally { transaction.end(); } } } @Activate (group = "provider" )public class DubboProviderApmFilter implements Filter { @Override public Result invoke (Invoker<?> invoker, Invocation invocation) throws RpcException { Transaction transaction = ElasticApm.startTransactionWithRemoteParent(key -> invocation.getAttachment(key)); try (final Scope scope = transaction.activate()) { String name = "provider:" + invocation.getInvoker().getInterface().getName() + "#" + invocation.getMethodName(); transaction.setName(name); transaction.setType(Transaction.TYPE_REQUEST); return invoker.invoke(invocation); } catch (Exception e) { transaction.captureException(e); throw e; } finally { transaction.end(); } } }
效果
源代码 https://github.com/siyuanWang/springCloudAlibabaAPMDemo
参考文档 ElasticAPM集成Dubbo的讨论